Databricks
Supported Databricks Runtime versions: 12.2 - 16.4 (scala 2.12)
❗ Note: Databricks Serverless is not supported for this instrumentation. You may optionally use the DBT agent instead.
To enable integration with definity on Databricks, follow these steps:
- Add the Spark Agent JAR to your compute cluster.
- Configure jobs or tasks to track with definity.
Cluster Configuration
1. Create an Init Script
Create a script to download and add the definity Spark agent to the cluster’s CLASSPATH
and set the default definity parameters. Save this script in cloud storage (e.g., S3).
#!/bin/bash
JAR_DIR="/databricks/jars"
mkdir -p $JAR_DIR
DEFINITY_JAR_URL="https://user:[email protected]/java/definity-spark-agent-[spark.version]-[agent.version].jar"
curl -o $JAR_DIR/definity-spark-agent.jar $DEFINITY_JAR_URL
export CLASSPATH=$CLASSPATH:$JAR_DIR/definity-spark-agent.jar
cat > /databricks/driver/conf/00-definity.conf << EOF
spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin
spark.definity.server="https://app.definity.run"
spark.definity.api.token=YOUR_TOKEN
#spark.definity.env.name=YOUR_DEFAULT_ENV
EOF
2. Attach the Init Script to Your Compute Cluster
In the Databricks UI:
- Go to Cluster configuration → Advanced options → Init Scripts.
- Add your script with:
- Source:
s3
- File path:
s3://your-s3-bucket/init-scripts-dir/definity_init.sh
- Source:
3. Configure Spark Cluster Name [Optional]
Default cluster name is taken from databricks cluster name.
Navigate to Cluster configuration → Advanced options → Spark and add:
spark.definity.compute.name my_cluster_name
Note: These settings affect the default Spark session created by the cluster. Definity will monitor this session automatically.
Job Tracking
Definity offers several tracking modes on Databricks to accommodate different workflow patterns.
Multi-Task Workflow [Default]
When running Databricks workflows with multiple tasks on a cluster, Definity tracks the compute cluster separately from the logical tasks and automatically detects parameters from your workflow:
- Pipeline name: Derived from the Databricks job name
- Task name: Derived from the Databricks task key
- PIT (Point in Time): Set to the timestamp when the run starts
Single-Task Cluster
For clusters running only one task, you may prefer to create a single Definity tracking session for clarity.
To enable this mode, disable automatic tracking:
spark.definity.databricks.automaticSessions.enabled=false
Then configure the following parameters: spark.definity.pipeline.name
, spark.definity.pipeline.pit
, and spark.definity.task.name
.
You can set these in the cluster configuration or as shown in the examples below.
Programmatic Multi-Task Cluster
You can manually define task scopes within your code. First, disable automatic tracking:
spark.definity.databricks.automaticSessions.enabled=false
Start Logical Task Tacking
# Set this property to define a new task scope
spark.conf.set("spark.definity.session", f"pipeline.name={my_pipeline},pipeline.pit={pit_date},task.name={my_task}")
Stop Logical Task Tracking
For multiple logical tasks within a single session, unset the property when each task completes:
try {
// your job logic here
...
} finally {
// Signal task completion (recommended in a `finally` block to catch failures)
spark.conf.unset("spark.definity.session")
}
Note: Unsetting the session is not required for Python script jobs and notebook jobs.
Job Configuration Examples
Example: Jobs API
definity parameters can be passed via the base_parameters
or parameters
fields depending on the task type.
{
"tasks": [
{
"task_key": "task1",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_1",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task1"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "task2",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_2",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task2"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "python_task1",
"spark_python_task": {
"python_file": "s3://my-bucket/python_task.py",
"parameters": [
"yourArg1",
"yourArg2",
"spark.definity.task.name=python_task_1",
"spark.definity.pipeline.name=my_pipeline",
"spark.definity.pipeline.pit=2025-01-01 01:00:00"
]
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
}
],
"format": "MULTI_TASK",
"queue": {
"enabled": true
}
}
Example: Airflow Notebook Job
run_notebook = DatabricksSubmitRunOperator(
task_id="run_notebook",
json={
"notebook_task": {
"notebook_path": "/Users/[email protected]/my_notebook",
"base_parameters": {
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ts }}",
"spark.definity.task.name": "{{ ti.task_id }}"
},
},
"name": "notebook-job",
}
)
Example: Airflow Python Job
run_python = DatabricksSubmitRunOperator(
task_id="run_python_script",
json={
"spark_python_task": {
"python_file": "dbfs:/path/to/job.py",
"parameters": [
"spark.definity.pipeline.name={{ dag_run.dag_id }}",
"spark.definity.pipeline.pit={{ ts }}",
"spark.definity.task.name={{ ti.task_id }}"
]
},
"name": "python-job",
}
)